package com.xlz.worker;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.xlz.domain.AppTask;
import com.xlz.domain.ShardItem;
import com.xlz.manager.DefaultDstManager;
import com.xlz.statistics.ExecuteStatisticCache;
import com.xlz.statistics.SelectStatisticCache;
import com.xlz.util.ReflectionUtils;

/**
 * 同步分布式调度工作组模板类.
 * @author 张蕾蕾
 * @date 2018 03 24
 */
public abstract class AbstractWorker<E> implements Worker {
	
	/**
	 * 日志
	 */
	protected final Logger LOG = LoggerFactory.getLogger(getClass());
	
	//线程池
	private ExecutorService threadPool ;
	//任务池
	private ArrayBlockingQueue<E> taskPool ;
	//线程池中数量
	private int threadPoolSize = 20; 
	
	private volatile ShardItem []shardItems;
	private volatile Integer shardItemCount;
	
	private volatile boolean isRunning = false;
	private volatile boolean isStoped = true;
	//当前任务的配置
	private AppTask appTask;
	private CountDownLatch countDownLatch;
	private Worker currentDealBean;
	//统计数据
	private SelectStatisticCache selectStatistic = new SelectStatisticCache();
	private ExecuteStatisticCache executeStatistic = new ExecuteStatisticCache();
	
	private DefaultDstManager scheduleManager;
	private String registerWorker ;
	
	public AbstractWorker(){
		currentDealBean = this;
	}
	
	public final void init(){
		if(!(appTask.getSingleTaskThreadCount().intValue() == 1 && appTask.getExecuteMethod() != null && !"".equals(appTask.getExecuteMethod()))){
			if(threadPoolSize > 20){
				threadPoolSize = 20;
			}
			threadPool = Executors.newFixedThreadPool(threadPoolSize);
			taskPool = new ArrayBlockingQueue<>(50000);
			isRunning = true;
			isStoped = false;
		}
	}
	
	public synchronized final void doSelect() {
		long startTime = 0;
		try {
			if( appTask.getReadiness().intValue() != 1 || !isRunning){
				return;
			}
			if(appTask.getSingleTaskThreadCount().intValue() == 1 && appTask.getExecuteMethod() != null && !"".equals(appTask.getExecuteMethod())){
				//反射执行方法
				ReflectionUtils.invokeMethod(currentDealBean, appTask.getExecuteMethod(), null, null);
			}else{
				startTime = System.currentTimeMillis();
				List<E> list = select();
				long endTime = System.currentTimeMillis();
				if(list == null){
					throw new Exception("加载到的数据为null");
				} else {
					//将统计信息放入缓存
					buildStatistic(endTime - startTime, list.size(), 1,"");
					//打印当前队列条数
					LOG.debug("当前加载的待处理 数据总条数：{}" ,list.size());
					for(E obj : list){
						if(threadPool.isShutdown() || appTask.getReadiness().intValue() != 1 || !isRunning){
							break;
						}
						appendTask(obj);
					}
					countDownLatch = new CountDownLatch(threadPoolSize);
					//启动执行线程
					for(int i = 0;i < threadPoolSize;i++){
						doExecute();
					}
					try {
						countDownLatch.await();
					} catch (InterruptedException e) {
						LOG.error("等待任务处理超时异常",e);
					}
				}
			}
		} catch (Exception e) {
			LOG.error("执行调度方法异常,appTask={}",appTask,e);
			long endTime = System.currentTimeMillis();
			//将统计信息放入缓存
			buildStatistic(endTime - startTime, 0, 0,e.getMessage());
		}
	}
	
	private void buildStatistic(long loadDataTime, long loadDataTotal, 
			int loadDataFlag,String errorContent){
		selectStatistic.doStatistic(appTask.getAppNo(),appTask.getId(),registerWorker,loadDataTime, loadDataTotal, loadDataFlag, errorContent);
	}
	/**
	 * 初始化线程池中处理任务,放在构造函数中同task类一起实例化
	 */
	@Override
	public synchronized final void doExecute(){
		if(threadPool.isShutdown() || appTask.getReadiness().intValue() != 1 || !isRunning){
			countDownLatch.countDown();
		}
		try {
			threadPool.submit(new Runnable(){
				@Override
				public void run() {
					//如果是自定义方法则执行自定义方法，否则执行默认的execute方法
					if(appTask.getExecuteMethod() != null && !"".equals(appTask.getExecuteMethod())  && appTask.getReadiness().intValue() == 1  && isRunning){
						//反射执行方法
						try {
							ReflectionUtils.invokeMethod(currentDealBean, appTask.getExecuteMethod(), null, null);
						} catch (Exception e) {
							LOG.error("处理任务异常：",e);
						}
					}else{
						E obj = taskPool.poll();
						while(obj != null && !threadPool.isShutdown() && appTask.getReadiness().intValue() == 1 && isRunning){
							try{
								long startTime = System.currentTimeMillis();
								execute( obj);
								long endTime = System.currentTimeMillis();
								executeStatistic.addExecuteDataTime(endTime - startTime);
								executeStatistic.addExecuteSuccessTotal(1);
							}catch(Exception e){
								executeStatistic.addExecuteFailTotal(1);
								LOG.error("处理任务异常：",e);
							}
							obj = taskPool.poll();
						}
					}
					countDownLatch.countDown();
				}
			});
		} catch (Exception e) {
			countDownLatch.countDown();
		}
	}
	
	/**
	 * 获取待处理任务
	 * @return
	 */
	protected List<E> select()throws Exception{return null;};
	
	/**
	 * 任务具体处理逻辑
	 * @param obj
	 */
	protected void execute(E obj) throws Exception{};
	
	protected boolean appendTask(E obj){
		if(obj == null){
			//打印日志或抛出异常
			return false;
		}
		return taskPool.add(obj);
	}
	
	public synchronized void shutdownNow(){
		if(taskPool != null)
			taskPool.clear();
		if(threadPool != null && !threadPool.isShutdown())
			threadPool.shutdownNow();
	}
	
	public synchronized void shutdown(){
		if(isRunning){
			isRunning = false;
			if(taskPool != null)
				taskPool.clear();
			new Thread(){
				@Override
				public void run(){
					long start = System.currentTimeMillis();
					if(threadPool != null && !threadPool.isShutdown())
						threadPool.shutdown();
					isStoped = true;
					long end = System.currentTimeMillis();
					LOG.debug("安全停止工作组{}成功，耗时{}ms,线程池当前状态{},",
							getRegisterWorker(),(end-start),threadPool.isShutdown());
				}
			}.start();
		}
	}

	public void setShardItems(String currShardItems) {
		String [] items = currShardItems.split("\\|");
		this.shardItems = new ShardItem[items.length];
		for(int i = 0;i < items.length;i ++){
			String params[] = items[i].split("#");
			shardItems[i] = new ShardItem(new Integer(params[0]),params[1]);
		}
	}

	/**
	 * 刷新统计信息到统计表
	 */
	public synchronized void flushStatistic() {
		selectStatistic.flushToDb();
		executeStatistic.flushToDb(this.appTask.getId(),registerWorker);
	}
	
	public ShardItem[] getShardItems() {
		return shardItems;
	}
	public Integer getShardItemCount() {
		return shardItemCount;
	}
	@Override
	public void setAppTask(AppTask appTask) {
		this.appTask = appTask;
	}
	@Override
	public AppTask getAppTask() {
		return appTask;
	}
	@Override
	public void setShardItemCount(Integer shardItemCount) {
		this.shardItemCount = shardItemCount;
	}
	@Override
	public void setRegisterWorker(String registerWorker) {
		this.registerWorker = registerWorker;
	}
	@Override
	public void setThreadPoolSize(int threadPoolSize) {
		this.threadPoolSize = threadPoolSize;
	}
	@Override
	public void setScheduleManager(DefaultDstManager scheduleManager) {
		this.scheduleManager = scheduleManager;
		selectStatistic.setScheduleManager(scheduleManager);
		executeStatistic.setScheduleManager(scheduleManager);
	}
	@Override
	public boolean isStoped() {
		return isStoped;
	}

	public boolean isRunning() {
		return isRunning;
	}

	public String getRegisterWorker() {
		return registerWorker;
	}

}
